

source('scripts/cleaning_utils.r')

for(year in seq(2008, 2020, 2)){
  set.seed(0)
  
  dat = glue('data/fec_arrow/cycle_table={year}/') %>%
    open_dataset() %>%
    select(-cmte_tp, -org_tp, -cmte_dsgn) %>%
    mutate(contbr_nm = str_to_lower(contbr_nm),
           contbr_nm_first = str_to_lower(contbr_nm_first),
           contbr_m_nm = str_to_lower(contbr_m_nm),
           contbr_nm_last = str_to_lower(contbr_nm_last),
           contbr_prefix = str_to_lower(contbr_prefix),
           contbr_suffix = str_to_lower(contbr_suffix),
           contbr_st1 = str_to_lower(contbr_st1),
           contbr_st2 = str_to_lower(contbr_st2),
           contbr_city = str_to_lower(contbr_city),
           contbr_st = str_to_lower(contbr_st),
           contbr_zip = str_to_lower(contbr_zip)) %>%
    collect() %>%
    slice(-1) %>% # get rid of header from csv
    as.data.table()
  
  dat = dat[is_individual == 't' | entity_tp == 'IND',]
  
  cores = sort(sample(1:60, nrow(dat), replace = T))
  dat[, contbr_st := split(dat$contbr_st, cores) %>% mclapply(function(x) str_remove(x, '[^[:alnum:]]'), mc.cores = 20) %>% unlist(use.names=F)]
  
  cols = names(dat)
  dat[,(cols) := mclapply(.SD, function(x) na_if(x, ''), mc.cores = 20), .SDcols = (cols)]
  
  cols = c('contb_aggregate_ytd', 'contb_receipt_amt')
  dat[,(cols) := mclapply(.SD, as.double, mc.cores = 2), .SDcols = (cols)]
  
  cols = c('rpt_yr', 'two_year_transaction_period')
  dat[,(cols) := mclapply(.SD, as.integer, mc.cores = 2), .SDcols = (cols)]
  
  dat[,cycle := as.integer(year)]
  
  dat[, row_n := .I]
  
  if(year <= 2014){ # parse names of individuals with unparsed names
    
    parse_these = dat[is.na(contbr_nm_first) & is.na(contbr_m_nm) & is.na(contbr_nm_last), .(row_n, contbr_nm)]
    parse_these[, contbr_nm := str_remove(contbr_nm, '\\s+&\\s+mr(\\s+)?(\\.)?(\\s+)?$') %>% str_squish()]
    parse_these[, contbr_nm := fifelse(str_remove(contbr_nm, '.* ') %>% str_detect('(mr|mrs|ms|dr)(\\.)?'), str_remove(contbr_nm, ' [^ ]*$'), contbr_nm)]
    parse_these[, contbr_nm := fifelse(str_detect(contbr_nm, '.*(jr|sr|ii|iii|iv),.*') & str_ends(contbr_nm, ' (jr|sr|ii|iii|iv)(\\.)?'), str_remove(contbr_nm, ' (jr|sr|ii|iii|iv)(\\.)?$'), contbr_nm)]
    
    parsed = fifelse(parse_these$contbr_nm == '', NA_character_, parse_these$contbr_nm) %>%
      format_reverse() %>%
      format_period() %>%
      parse_names() %>%
      select(salutation:suffix) %>%
      as.data.table()
    
    dat[parse_these$row_n, c('contbr_prefix', 'contbr_nm_first', 'contbr_m_nm', 'contbr_nm_last', 'contbr_suffix') := parsed]
    
  }
  
  dat[, contbr_nm := NULL]
  dat[, emmid := str_c(str_replace_na(contbr_st, ''), str_sub(year, 3, 4), row_n, sep = '-')]
  
  cols = c('contbr_st1', 'contbr_st2', 'contbr_nm_last', 'contbr_nm_first', 'contbr_m_nm', 'contbr_suffix', 'contbr_prefix', 'contbr_city')
  dat[, (cols) := mclapply(.SD, function(x) str_remove_all(x, '[[:punct:]]|\\+'), mc.cores = length(cols)), .SDcols = (cols)] # might need to add ( or ) 
  
  cores = sort(sample(1:60, nrow(dat), replace = T))
  dat[, contbr_zip := split(contbr_zip, cores) %>% mclapply(normalize_zip, mc.cores = 20) %>% unlist(use.names=F)]
  
  dat[, po_num := po_box_parser_fec(dat$contbr_st1, dat$contbr_st2)]
  
  cols = c('contbr_nm_first', 'contbr_m_nm', 'contbr_nm_last', 'contbr_prefix', 'contbr_suffix', 'contbr_city')
  
  dat[, (cols) := mclapply(.SD, str_squish, mc.cores = length(cols)), .SDcols = (cols)]
  
  dat[, contbr_name_fm := split(dat[, .(contbr_nm_first, contbr_m_nm)], cores) %>% mclapply(function(x) merge_fields(x$contbr_nm_first, x$contbr_m_nm), mc.cores = 20) %>% unlist(use.names=F)]
  dat[, contbr_m_nm := NULL]
  
  dat[, contbr_name_ls := split(dat[, .(contbr_nm_last, contbr_suffix)], cores) %>% mclapply(function(x) merge_fields(x$contbr_nm_last, x$contbr_suffix), mc.cores = 20) %>% unlist(use.names=F)]
  dat[, contbr_suffix := NULL]
  
  dat[, contbr_addr := split(dat[, .(contbr_st1, contbr_st2)], cores) %>% mclapply(function(x) merge_fields(x$contbr_st1, x$contbr_st2), mc.cores = 20) %>% unlist(use.names=F)]
  dat[, contbr_st1 := NULL]
  dat[, contbr_st2 := NULL]
  
  dat[, contbr_nm_first := split(dat$contbr_nm_first, cores) %>% mclapply(function(x) str_squish(str_remove(x, ' .*$')), mc.cores = 20) %>% unlist(use.names=F)]
  dat[, contbr_nm_last := split(dat$contbr_nm_last, cores) %>% mclapply(function(x) str_squish(str_remove(str_replace_all(x, c('^mc ' = 'mc', '^o ' = 'o', '^st ' = 'st', '^de ' = 'de')), ' .*$')), mc.cores = 20) %>% unlist(use.names=F)]
  
  dat[, contbr_addr := split(dat$contbr_addr, cores) %>% mclapply(usps_address, mc.cores = 20) %>% unlist(use.names=F)]
  
  dat[, contbr_city := split(dat$contbr_city, cores) %>% mclapply(str_squish, mc.cores = 20) %>% unlist(use.names=F)]
  
  cols = c('contbr_addr', 'po_num')
  dat[, (cols) := mclapply(.SD, function(x) na_if(x, ''), mc.cores = length(cols)), .SDcols = (cols)]
  
  dat[, contbr_addr := fifelse(!is.na(po_num), NA_character_, contbr_addr)]
  
  #Filtering observations without info, adding gender and date...
  dat = dat[(!(is.na(contbr_name_fm)&is.na(contbr_name_ls))) & (!(is.na(contbr_addr)&is.na(po_num))),]
  
  setindex(dat, 'contbr_nm_first')
  
  setnames(names, 'first_name', 'contbr_nm_first')
  setindex(names, 'contbr_nm_first')
  
  dat = names[dat, on = 'contbr_nm_first']
  
  dat[, gender := fifelse(is.na(gender), 'u', gender)]
  dat[, gender := fcase(gender == 'm', 0L, gender == 'u', 1L, gender == 'f', 2L)]
  
  dat[, contb_receipt_dt := as_date(contb_receipt_dt)]
  
  if(year <= 2012){ # fix prefixes and suffixes
    
    dat[, contbr_name_ls := fifelse(str_detect(str_remove(contbr_name_fm, '.*\\ '), '^(j|s)r$|^ii(i)?$'), str_c(contbr_name_ls, ' ', str_remove(contbr_name_fm, '.*\\ ')), contbr_name_ls)]
    dat[, contbr_name_fm := fifelse(str_detect(str_remove(contbr_name_fm, '.*\\ '), '^(j|s|d)r$|^m(r|s|d)?$|^esq$|^ii(i)?$'), str_remove(contbr_name_fm, '\\s+[^ ]+$'), contbr_name_fm)]
    
    dat[, contbr_name_ls := fifelse(str_detect(str_remove(contbr_name_fm, '.*\\ '), '^(j|s)r$|^ii(i)?$'), str_c(contbr_name_ls, ' ', str_remove(contbr_name_fm, '.*\\ ')), contbr_name_ls) %>% str_squish()]
    dat[, contbr_name_fm := fifelse(str_detect(str_remove(contbr_name_fm, '.*\\ '), '^(j|s|d)r$|^m(r|s|d)?$|^esq$|^ii(i)?$'), str_remove(contbr_name_fm, '\\s+[^ ]+$'), contbr_name_fm) %>% str_squish()]
    
  }
  
  # create exact-match group IDs and state partitions...
  
  cols = c('contbr_name_fm', 'contbr_name_ls', 'contbr_addr', 'po_num', 'contbr_zip', 'contbr_city', 'contbr_st')
  
  dat[, incl_in_dedupe := seq_len(.N), by = cols]
  dat[, incl_in_dedupe := fcase(incl_in_dedupe == 1, 1L, incl_in_dedupe == 2L, as.integer(rbernoulli(.N, p = 0.2)), default = 0L)]
  
  dat[, exact_person_id := .GRP, by = cols]
  
  write_dataset(
    dat,
    glue('data/fec_clean/cycle_table={year}'),
    partitioning = c('incl_in_dedupe', 'contbr_st'),
    max_rows_per_file = 5e5
    )
  
}
